\section{\dbsp and richer query languages}\label{sec:extensions}

The \dbsp language can express a richer class of streaming computations (both incremental and non-incremental)
than those covered so far. In this section we enumerate several important classes of
queries that can be implemented in \dbsp, and thus can be incrementalized using Algorithm~\ref{algorithm-inc}.

\subsection{Multisets and bags}

In \secref{sec:relational} we have shown how to implement the relational algebra on sets.
Some SQL queries however produce \emph{multisets}, e.g., \code{UNION ALL}.
Since \zrs generalize multisets and bags, it is trivial to implement query
operators on multisets, by just omitting $\distinct$ operator invocations.
For example, SQL \code{UNION} is \zr addition followed by $\distinct$,
whereas \code{UNION ALL} is just \zr addition.  Indeed, the SQL to \dbsp compiler
mentioned in \refsec{sec:implementation} handles full standard SQL, including
all multiset queries.

\subsection{Aggregation}\label{sec:aggregation}

Aggregation in SQL applies a function $a$ to a set of values of type $A$ producing a ``scalar''
result with some result type $B$: $a: 2^A \to B$.  In \dbsp an aggregation function has
a signature $a: \Z[A] \to B$.

The SQL \texttt{COUNT} aggregation function is implemented on \zrs by $a_\texttt{COUNT} : \Z[A] \to \Z$, which
computes a \emph{sum} of all the element weights: $a_\texttt{COUNT}(s) = \sum_{x \in s} s[x]$.
The SQL \texttt{SUM} aggregation function is implemented on \zrs by $a_\texttt{SUM}: \Z[\R] \to \R$ which
performs a \emph{weighted sum} of all (real) values: $a_\texttt{SUM}(s) = \sum_{x \in s} x \times s[x]$.
Both these implementations work correctly for sets and multisets.

With this definition the aggregation functions $a_\texttt{COUNT}$ and $a_\texttt{SUM}$ are in
fact linear transformations between the group $\Z[A]$ and the result group ($\Z$, and $\R$ respectively).

If the output of the \dbsp circuit is allowed to be such a ``scalar'' value, then aggregation
with a linear function is simply function application, and thus linear.
However, in general, composing multiple queries
requires the result of an aggregation to be a singleton \zr (containing a single value),
and not a scalar value.  In this case the aggregation function is implemented in
\dbsp as the composition of the actual aggregation and the
$\makeset: A \to \Z[A]$ function,
which converts a scalar value of type $A$ to a singleton \zr, defined as follows:
$\makeset(x) \defn \{ x \mapsto 1 \}$.

In conclusion, the following SQL query:
\code{SELECT SUM(c) FROM I}
is implemented as the following circuit:\footnote{The actual SQL \texttt{SUM} aggregate is
even more complicated, because it needs to skip \texttt{NULL}s,
and it returns \texttt{NULL} for an empty input set;
this can be implemented in \dbsp.}

\begin{tikzpicture}[auto,>=latex]
  \node[] (I) {\code{I}};
  \node[block, right of=I] (pi) {$\pi_\texttt{C}$};
  \node[block, right of=pi] (a) {$a_\texttt{SUM}$};
  \draw[->] (I) -- (pi);
  \draw[->] (pi) -- (a);
  \node[block, right of=a, node distance=1.5cm] (m) {$\makeset$};
  \node[right of=m] (O) {\code{O}};
  \draw[->] (a) -- (m);
  \draw[->] (m) -- (O);
\end{tikzpicture}

The lifted incremental version of this circuit is interesting: since $\pi$
and $a_\texttt{SUM}$ are linear, they are equivalent to their own incremental
versions.  Although $\inc{(\lift \makeset)} = \D \circ \lift{\makeset} \circ \I$
cannot be simplified, it is nevertheless efficient, doing only O(1) work per
invocation, since its input and output are singleton values.

%An aggregation function such as \texttt{AVG} can be written as the composition of
%a linear function that computes a pair of values using
%\texttt{SUM} and \texttt{COUNT}, followed by a division and a call to \makeset.

%\begin{lstlisting}[language=SQL]
%SELECT AVG(c) FROM I
%\end{lstlisting}
%
%\begin{tikzpicture}[auto,>=latex]
%  \node[] (I) {\code{I}};
%  \node[block, right of=I] (pi) {$\pi_\texttt{C}$};
%  \node[block, right of=pi, node distance=1.4cm] (sc) {$(a_\texttt{SUM}, a_\texttt{COUNT})$};
%  \draw[->] (I) -- (pi);
%  \draw[->] (pi) -- (sc);
%  \node[block, right of=sc, node distance=1.8cm] (m) {$\makeset$};
%  \node[block, right of=m, node distance=1.2cm] (div) {$\sigma_/$};
%  \node[right of=div] (O) {\code{O}};
%  \draw[->] (sc) -- (m);
%  \draw[->] (m) -- (div);
%  \draw[->] (div) -- (O);
%\end{tikzpicture}

Finally, some aggregate functions, such as \code{MIN}, are
\emph{not} linear: for handling deletions
they need to track the full set.  One way to implement in \dbsp the lifted
incremental version of such aggregate functions is
by ``brute force'', using the formula $\inc{(\lift a_\texttt{MIN})}
= \D \circ \lift{a_\texttt{MIN}} \circ \I$.  Such an implementations performs work
$O(|DB|)$ at each invocation.  However, schemes
such as Reactive Aggregator~\cite{tangwongsan-vldb15} can be implemented as custom \dbsp operators to bring
the amortized cost per update to $O(\log |DB|)$.  This approach is similar to the
customized implementation of the $\distinct$ operator, and it is another facet of
the modularity of \dbsp, which allows optimized operator implementations to be
mixed and matched.

\subsection{Grouping; indexed relations}\label{sec:grouping}

Let $K$ be a set of ``key values.''
Consider the mathematical structure of finite maps from $K$
to \zrs: $K \to \Z[A] = \Z[A][K]$.
We call values $i$ of this structure \defined{indexed \zrs}: for
each key $k \in K$, $i[k]$ is a \zr.  Because
the codomain $\Z[A]$ is an abelian group, this structure is itself
an abelian group.

We use this structure to implement the SQL \texttt{GROUP BY} operator in \dbsp.
Consider a \defined{partitioning function}
$p: A \to K$ that assigns a key to any value in $A$.  We define the grouping function
$G_p: \Z[A] \to \Z[A][K]$ as $G_p(a)[k] \defn \sum_{x \in a.p(x)=k}a[x] \cdot x$
(just map each element of the input $a$ to the \zr grouping corresponding to its key).
When applied to a \zr $a$ this function returns a indexed \zr, where each element
is a \defined{grouping}:\footnote{We use
``group'' for the algebraic structure and ``grouping'' for the result of \code{GROUP BY}.} for each key $k$ a
grouping is a \zr containing all elements of $a$ that map to $k$
(as in SQL, groupings are multisets).
Consider our example \zr $R$ from \refsec{sec:relational},
and a key function $p(s)$ that returns the first letter of the string
$s$. Then we have that $G_p(R) = \{ \code{j} \mapsto \{ \code{joe}
\mapsto 1 \}, \code{a} \mapsto \{ \code{anne} \mapsto -1 \} \}$,
i.e., grouping with this key function produces an indexed \zr with two groupings, each
of which contains a \zr with one element.

The grouping function $G_p$ is linear for any key function $p$!
It follows that the group-by implementation in \dbsp is automatically
incremental: given some changes
to the input relation we can apply the partitioning function
to each row changed in the input separately to compute how each grouping changes.

Notice that, unlike SQL, \dbsp can express naturally computations
on indexed \zrs, they are just an instance of a group structure.
In \dbsp one does not need to follow
grouping by aggregation, and \dbsp can represent nested groupings
of arbitrary depth.  Our definition of incremental
computation is only concerned with incrementality in the
\emph{outermost} structures.  We leave it to future work to
explore an appropriate definition of incremental computation
on the \emph{inner} relations.


\subsection{\texttt{GROUP BY-AGGREGATE}, flatmap}

Grouping in SQL is almost always followed by aggregation.
Let us consider an aggregation function $a: (K \times \Z[A]) \to B$ that produces values
in some group $B$, and an indexed relation of type $\Z[A][K]$, as defined above in~\refsec{sec:grouping}.
The nested relation aggregation operator $Agg_a: \Z[A][K] \to B$ applies $a$
to the contents of each grouping independently and adds the results:
$Agg_a(g) \defn \sum_{k \in K} a(k, g[k])$.  To apply this
to our example, let us compute the equivalent of \texttt{GROUP BY-COUNT}; we use
the following aggregation function $count: K \times \Z[A]$, $count(k, s) =
\makeset((k, a_\texttt{COUNT}(s)))$, using the \zr counting function $a_\texttt{COUNT}$
from~\refsec{sec:aggregation}; the notation $(a,b)$ is a pair of values $a$ and $b$.
Then we have $Agg_{count} (G_p(R)) = \{ (\code{j}, 1) \mapsto 1,
(\code{a}, -1) \mapsto 1 \}$.

%\subsection{\texttt{UNNEST} (flatmap)}

A very useful operation on nested relations is \defined{flatmap} (or \code{UNNEST} in SQL), which is
essentially the inverse of grouping, converting an indexed
\zr into a \zr: $\mbox{flatmap}: \Z[A][K] \to \Z[A \times K]$.
$\mbox{flatmap}$ is in fact a particular instance of aggregation,
using the aggregation function $a: K \times \Z[A] \to \Z[A \times K]$
defined by $a(k, s) = \sum_{x \in s[k]} s[k][x] \cdot (k, x).$
For our previous example, $\mbox{flatmap}(G_p(R)) = \{ (\code{j}, \code{joe}) \mapsto 1,
(\code{a}, \code{anne}) \mapsto -1 \}$.

%If we use an aggregation function $a: K \times Z[A]$ that is linear in its
%second argument, then the aggregation operator $Agg_a$ is linear, and
%thus fully incremental.  As a consequence, $\mbox{flatmap}$ is linear.
%However, many practical aggregation functions for nested relations are in fact
%not linear; an example is the $count$ function above, which is not linear
%since it uses the $\makeset$ non-linear function.  Nevertheless, while
%the incremental evaluation of such functions is not fully incremental,
%it is at least partly incremental: when applying a change to groupings, the aggregation
%function only needs to be re-evaluated \emph{for groupings that have changed}.

%\subsection{Antijoin}\label{sec:antijoin}\index{antijoin}
%
%Antijoins arise in the implementation of Datalog programs with stratified negation.
%Consider the following program:
%
%\begin{lstlisting}[language=ddlog,basicstyle=\small]
%O(v, z) :- I1(v, z), not I2(v).
%\end{lstlisting}
%
%The semantics of such a rule is defined in terms of joins and set difference.
%This rule is equivalent with the following pair of rules:
%
%\begin{lstlisting}[language=ddlog,basicstyle=\small]
%C(v, z) :- I1(v, z), I2(v).
%O(v, z) :- I1(v, z), not C(v, z).
%\end{lstlisting}
%
%This transformation reduces an antijoin to a join
%followed by a set difference.  This produces the following \dbsp circuit:
%
%\begin{tikzpicture}[auto,>=latex]
%  \node[] (i1) {\code{I1}};
%  \node[below of=i1, node distance=.5cm] (i2) {\code{I2}};
%  \node[block, right of=i1, node distance=1.5cm] (join) {$\bowtie$};
%  \node[block, shape=circle, inner sep=0in, right of=join] (m) {---};
%  \node[block, above of=m, shape=circle, inner sep=0in, node distance=.6cm] (plus) {$+$};
%  \node[block, right of=plus, node distance=1cm] (distinct) {$\distinct$};
%  \node[right of=distinct, node distance=1cm] (output) {\code{O}};
%  \draw[->] (i1) -- node (tap) {} (join);
%  \draw[->] (i2) -| (join);
%  \draw[->] (join) -- (m);
%  \draw[->] (m) -- (plus);
%  \draw[->] (tap.south) |- (plus);
%  \draw[->] (plus) -- (distinct);
%  \draw[->] (distinct) -- (output);
%\end{tikzpicture}
%
\subsection{Streaming joins}

Consider a binary query $T(s, t) = \I(s)~~\lift{\bowtie}~~t$.  This is the
\emph{relation-to-stream join} operator supported by streaming databases like Kafka's ksqlDB~\cite{jafarpour-edbt19}.
Stream $s$ carries changes to a relation, while $t$ carries arbitrary data, e.g., logs
or telemetry data points. $T$ discards values from $t$ after matching them against the accumulated contents of the relation $\I(s)$.

%\item[Explicit delay:]
%So far the $\zm$ operator was only used in integration or
%differentiation.  However, it can be exposed as a primitive operation that
%can be applied to streams.  This enables programs that can
%perform convolutions and time-based window computations over streams:

\paragraph{Streaming Window queries}

Streaming databases often organize the contents of streams into windows,
which store a subset of data points with a predefined range of timestamps.
%% The circuit below computes a \emph{fixed-size sliding-window aggregate}
%% over the last four timestamps with aggregations defined by the $T_i$ functions.
%%
%% \begin{center}
%% \begin{tikzpicture}[>=latex]
%%     \node[] (input) {$s$};
%%     \node[block, right of=input, node distance=1.5cm] (f0) {$T_0$};
%%     \node[below of=input, node distance=.5cm] (fake) {};
%%     \node[block, right of=fake, node distance=1cm] (z0) {$\zm$};
%%     \node[right of=input, node distance=.35cm] (tap) {};
%%     \node[block, right of=f0, node distance=1.5cm] (f1) {$T_1$};
%%     \node[block, right of=z0, node distance=1.2cm] (z1) {$\zm$};
%%     \node[block, right of=f1, node distance=1.5cm] (f2) {$T_2$};
%%     \node[block, right of=z1, node distance=1.5cm] (z2) {$\zm$};
%%     \draw[->] (input) -- (f0);
%%     \draw[->] (tap.center) |- (z0);
%%     \draw[->] (z0) -| (f0);
%%     \draw[->] (f0) -- (f1);
%%     \draw[->] (z0) -- (z1);
%%     \draw[->] (z1) -| (f1);
%%     \draw[->] (f1) -- (f2);
%%     \draw[->] (z1) -- (z2);
%%     \draw[->] (z2) -| (f2);
%%     \node[right of=f2] (output) {$o$};
%%     \draw[->] (f2) -- (output);
%% \end{tikzpicture}
%% \end{center}
%%
In practice, windowing is usually based on physical timestamps attached to
stream values rather than logical (transaction) time as in the previous circuit.
For instance, the CQL~\cite{arasu-tr02} query
``\texttt{SELECT * FROM events [RANGE 1 hour]}'' returns all events received
within the last hour.  The corresponding circuit (on the left)
takes input stream $s \in \stream{\Z[A]}$ and an additional
input $\theta \in \stream{\mathbb{R}}$ that carries the value of the current
time.

\begin{tabular}{m{3cm}m{0.5cm}m{3cm}}
\begin{tikzpicture}[>=latex]
    \node[] (input) {$s$};
    \node[above of=input, node distance=.5cm] (t) {$\theta$};
    \node[block, right of=input] (i) {$I$};
    \node[block, right of=i] (w) {$W$};
    \node[right of=w] (output) {$o$};
    \draw[->] (input) -- (i);
    \draw[->] (i) -- (w);
    \draw[->] (w) -- (output);
    \draw[->] (t) -| (w);
\end{tikzpicture}
&
$\cong$
&
\begin{tikzpicture}[>=latex]
    \node[] (input) {$s$};
    \node[above of=input, node distance=.5cm] (t) {$\theta$};
    \node[block, shape=circle, right of=input, inner sep=0pt] (plus) {$+$};
    \node[block, right of=plus] (w) {$W$};
    \node[right of=w] (output) {$o$};
    \node[block, below of=plus, node distance=.6cm] (z) {$\zm$};
    \draw[->] (input) -- (plus);
    \draw[->] (plus) -- (w);
    \draw[->] (t) -| (w);
    \draw[->] (w) -- node (mid) {} (output);
    \draw[->] (mid.center) |-  (z);
    \draw[->] (z) -- (plus);
\end{tikzpicture} \\
\end{tabular}

\noindent{}where the \emph{window operator} $W$ prunes input \zrs, only keeping values
with timestamps less than an hour behind $\theta[t]$.  Assuming $ts: A \to \mathbb{R}$ returns
the physical timestamp of a value, $W$ is defined as $W(v, \theta)[t] \defn \{x \in v[t] .
ts(x) \geq \theta[t] - 1hr\}$.  Assuming $\theta$ increases monotonically, $W$
can be moved inside integration, resulting in the circuit on the right, which uses
bounded memory to compute a window of an unbounded stream.
This circuit is a building block of a large family of window queries, including
window joins and window aggregation (e.g., SQL \texttt{OVER} queries).

\begin{comment}
\item[Weakening assumptions]

The theory as presented requires streams to be over group elements.  However,
this is necessary only if we want to automatically compute incremental
versions of the operators --- the addition and negation operations are
required for building $\I$ and $\D$.  The streaming circuits model
works very well on much simpler algebraic structures.  The $0$ element
is still needed to define $\zm$ and time-invariance.  Circuits that
mix streams over groups with arbitrary streams are well-defined.  However,
the zero-preservation property is required even for such general computations,
for the time-invariance of the resulting circuits.
\end{comment}

\subsection{Relational while queries}

\dbsp can express programs that go beyond Datalog:
see the non-monotonic semantics for Datalog$^\neg$ and
Datalog$^{\neg\neg}$\cite{Abiteboul-book95}.
We implement the following
``while'' program, where $Q$ is an arbitrary query:
{\small
\begin{lstlisting}[language=Pascal]
x := i;
while (x changes)
    x := Q(x);
\end{lstlisting}}
The \dbsp implementation of this program is:

%$$\lambda i. \int[\D[\fix{\xi}{Q(\delta_0(i)+\zm(\xi))}]]$$
\begin{center}
\begin{tikzpicture}[>=latex]
  \node[] (input) {$i$};
  \node[block, right of=input] (delta) {$\delta_0$};
  \node[block, circle, right of=delta, inner sep=0cm] (p) {$+$};
  \node[block, right of=p] (Q) {$\lift Q$};
  \node[block, right of=Q] (D) {$\D$};
  \node[block, right of=D] (S) {$\int$};
  \node[right of=S] (output)  {$x$};
  \node[block, below of=p, node distance=.7cm] (z) {$\zm$};
  \draw[->] (input) -- (delta);
  \draw[->] (delta) -- (p);
  \draw[->] (p) -- (Q);
  \draw[->] (Q) -- node (mid) {} (D);
  \draw[->] (D) -- (S);
  \draw[->] (mid.center) |- (z);
  \draw[->] (S) -- (output);
  \draw[->] (z) -- (p);
\end{tikzpicture}
\end{center}

This circuit can be converted to a streaming circuit that computes a stream of values $i$
by lifting it; it can be incrementalized using Algorithm~\ref{algorithm-inc} to compute on changes of $i$:

\begin{center}
\begin{tikzpicture}[>=latex]
  \node[] (input) {$\Delta i$};
  \node[block, right of=input] (delta) {$\lift{\delta_0}$};
  \node[block, circle, right of=delta, inner sep=0cm] (p) {$+$};
  \node[block, right of=p] (Q) {$\inc{(\lift{\lift{Q}})}$};
  \node[block, right of=Q, node distance=1.5cm] (D) {$\lift{\D}$};
  \node[block, right of=D, node distance=1.1cm] (S) {$\lift{\int}$};
  \node[right of=S, node distance=1.2cm] (output)  {$\Delta x$};
  \node[block, below of=p, node distance=.7cm] (z) {$\lift{\zm}$};
  \draw[->] (input) -- (delta);
  \draw[->] (delta) -- (p);
  \draw[->] (p) -- (Q);
  \draw[->] (Q) -- node (mid) {} (D);
  \draw[->] (D) -- (S);
  \draw[->] (mid.center) |- (z);
  \draw[->] (S) -- (output);
  \draw[->] (z) -- (p);
\end{tikzpicture}
\end{center}

At runtime the execution of this circuit is not guaranteed to terminate;
however, if the circuit does terminate, it will produce the correct
output, i.e., the least fixpoint of $Q$ that includes~$i$.
